These needed some updates and workarounds.
They are now part of the testsuite and can
be run like this:
meson test -Cbuild --suite headless
+++ /dev/null
-import sys
-import subprocess
-import gi
-
-gi.require_version('Gdk', '4.0')
-gi.require_version('Gtk', '4.0')
-
-from gi.repository import GLib, GObject, Gdk, Gtk
-from pydbus import SessionBus
-
-verbose = True
-
-remote_desktop = None
-screen_cast = None
-session = None
-stream_path = None
-done = False
-
-def terminate():
- sys.exit(1)
-
-loop = None
-
-def quit_cb(loop):
- loop.quit()
-
-def wait(millis):
- global loop
- loop = GLib.MainLoop()
- GLib.timeout_add(millis, quit_cb, loop)
- loop.run()
-
-display = None
-window = None
-expected_change = None
-
-def key_pressed_cb (controller, keyval, keycode, state):
- global expected_change
- global loop
-
- if verbose:
- print(f'got key press: {keyval}, state {state}')
- assert expected_change != None, "Unexpected key press"
- assert expected_change['type'] == 'press', "Key press event expected"
- assert keyval == expected_change['keyval'], "Unexpected keyval in key press event"
- assert state == expected_change['state'], "Unexpected state in key press event"
-
- expected_change = None
- loop.quit()
-
-def key_released_cb (controller, keyval, keycode, state):
- global expected_change
- global loop
-
- if verbose:
- print(f'got key release: {keyval}, state {state}')
- assert expected_change != None, "Unexpected key release"
- assert expected_change['type'] == 'release', "Key release event expected"
- assert keyval == expected_change['keyval'], "Unexpected keyval in key release event"
- assert state == expected_change['state'], "Unexpected state in key release event"
-
- expected_change = None
- loop.quit()
-
-def motion_cb (controller, x, y):
- global expected_change
- global loop
-
- if verbose:
- print(f'got motion: {x}, {y}')
- if expected_change != None:
- assert expected_change['type'] == 'motion', "Motion event expected"
- assert x == expected_change['x'], "Unexpected x coord in motion event"
- assert y == expected_change['y'], "Unexpected y coord in motion event"
- expected_change = None
- loop.quit()
-
-def enter_cb (controller, x, y):
- global expected_change
- global loop
-
- if verbose:
- print(f'got enter: {x}, {y}')
- assert expected_change != None, "Unexpected enter"
- assert expected_change['type'] == 'enter', "Enter event expected"
- assert x == expected_change['x'], "Unexpected x coord in enter event"
- assert y == expected_change['y'], "Unexpected y coord in enter event"
-
- expected_change = None
- loop.quit()
-
-def pressed_cb(controller, n, x, y):
- global expected_change
- global loop
-
- if verbose:
- print(f'got pressed')
- assert expected_change != None, "Unexpected event"
- assert expected_change['type'] == 'press', "Button press expected"
- assert expected_change['button'] == controller.get_current_button(), "Unexpected button pressed"
- assert x == expected_change['x'], "Unexpected x coord in motion event"
- assert y == expected_change['y'], "Unexpected y coord in motion event"
-
- expected_change = None
- loop.quit()
-
-def released_cb(controller, n, x, y):
- global expected_change
- global loop
-
- if verbose:
- print(f'got released')
- assert expected_change != None, "Unexpected event"
- assert expected_change['type'] == 'release', "Button release expected"
-
- expected_change = None
- loop.quit()
-
-def expect_key_press(keyval, state, timeout):
- global expected_change
- expected_change = {
- 'type' : 'press',
- 'keyval' : keyval,
- 'state' : state
- }
- wait(timeout)
- assert expected_change == None, "Expected event did not happen"
-
-def expect_key_release(keyval, state, timeout):
- global expected_change
- expected_change = {
- 'type' : 'release',
- 'keyval' : keyval,
- 'state' : state
- }
- wait(timeout)
- assert expected_change == None, "Expected event did not happen"
-
-def expect_motion(x, y, timeout):
- global expected_change
- expected_change = {
- 'type' : 'motion',
- 'x' : x,
- 'y' : y
- }
- wait(timeout)
- assert expected_change == None, "Expected event did not happen"
-
-def expect_enter(x, y, timeout):
- global expected_change
- expected_change = {
- 'type' : 'enter',
- 'x' : x,
- 'y' : y
- }
- wait(timeout)
- assert expected_change == None, "Expected event did not happen"
-
-def expect_button_press(button, x, y, timeout):
- global expected_change
- expected_change = {
- 'type' : 'press',
- 'button' : button,
- 'x' : x,
- 'y' : y
- }
- wait(timeout)
- assert expected_change == None, "Button press did not arrive"
-
-def expect_button_release(button, x, y, timeout):
- global expected_change
- expected_change = {
- 'type' : 'release',
- 'button' : button,
- 'x' : x,
- 'y' : y
- }
- wait(timeout)
- assert expected_change == None, "Button release did not arrive"
-
-def got_active(object, pspec):
- global loop
- object.disconnect_by_func(got_active)
- loop.quit()
-
-def launch_observer():
- global display
- global window
-
- if verbose:
- print('launch observer')
-
- if display == None:
- Gdk.set_allowed_backends('wayland')
- display = Gdk.Display.open('gtk-test')
-
- window = Gtk.Window.new()
- window.set_display(display)
-
- controller = Gtk.EventControllerKey.new()
- controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
- controller.connect('key-pressed', key_pressed_cb)
- controller.connect('key-released', key_released_cb)
- window.add_controller(controller)
-
- controller = Gtk.EventControllerMotion.new()
- controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
- controller.connect('enter', enter_cb)
- controller.connect('motion', motion_cb)
- window.add_controller(controller)
-
- controller = Gtk.GestureClick.new()
- controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
- controller.connect('pressed', pressed_cb)
- controller.connect('released', released_cb)
- window.add_controller(controller)
-
- window.connect('notify::is-active', got_active)
- window.maximize()
- window.present()
-
- wait(500)
-
- assert window.is_active(), "Observer not active"
- assert window.get_width() == 1024, "Window not maximized"
- assert window.get_height() == 768, "Window not maximized"
-
- # we need to wait out the map animation, or pointer coords will be off
- wait(1000)
-
-def stop_observer():
- global window
- window.destroy()
- window = None
-
-def key_press(keyval):
- if verbose:
- print(f'press key {keyval}')
- session.NotifyKeyboardKeysym(keyval, True)
-
-def key_release(keyval):
- if verbose:
- print(f'release key {keyval}')
- session.NotifyKeyboardKeysym(keyval, False)
-
-buttons = {
- 1 : 0x110,
- 2 : 0x111,
- 3 : 0x112
-}
-
-def button_press(button):
- if verbose:
- print(f'press button {button}')
- session.NotifyPointerButton(buttons[button], True)
-
-def button_release(button):
- if verbose:
- print(f'relase button {button}')
- session.NotifyPointerButton(buttons[button], False)
-
-def pointer_move(x, y):
- if verbose:
- print(f'pointer move {x} {y}')
- session.NotifyPointerMotionAbsolute(stream_path, x, y)
-
-def basic_keyboard_tests():
- try:
- launch_observer()
-
- key_press(Gdk.KEY_a)
- expect_key_press(keyval=Gdk.KEY_a, state=0, timeout=100)
-
- key_release(Gdk.KEY_a)
- expect_key_release(keyval=Gdk.KEY_a, state=0, timeout=100)
-
- key_press(Gdk.KEY_Control_L)
- expect_key_press(keyval=Gdk.KEY_Control_L, state=0, timeout=100)
-
- key_press(Gdk.KEY_x)
- expect_key_press(keyval=Gdk.KEY_x, state=Gdk.ModifierType.CONTROL_MASK, timeout=100)
-
- key_release(Gdk.KEY_Control_L)
- expect_key_release(keyval=Gdk.KEY_Control_L, state=Gdk.ModifierType.CONTROL_MASK, timeout=100)
-
- key_release(Gdk.KEY_x)
- expect_key_release(keyval=Gdk.KEY_x, state=0, timeout=100)
-
- stop_observer()
- except AssertionError as e:
- print("Error in basic_keyboard_tests: {0}".format(e))
- terminate()
-
-def basic_pointer_tests():
- try:
- pointer_move(-100.0, -100.0)
- launch_observer()
-
- # observer window is maximized, so window coords == global coords
- pointer_move(500.0, 300.0)
- expect_enter(x=500, y=300, timeout=200)
-
- pointer_move(400.0, 200.0)
- expect_motion(x=400, y=200, timeout=200)
-
- button_press(1)
- expect_button_press(button=1, x=400, y=200, timeout=200)
-
- pointer_move(220.0, 200.0)
- expect_motion(x=220, y=200, timeout=200)
-
- button_release(1)
- expect_button_release(button=1, x=220, y=200, timeout=200)
-
- stop_observer()
- except AssertionError as e:
- print("Error in basic_pointer_tests: {0}".format(e))
- terminate()
-
-ds_window = None
-ds = None
-
-def drag_begin(controller, drag):
- global expected_change
- global loop
-
- if verbose:
- print(f'got drag begin')
- assert expected_change != None, "Unexpected drag begin"
- assert expected_change['type'] == 'drag', "Drag begin expected"
-
- expected_change = None
- loop.quit()
-
-def launch_drag_source(value):
- global display
- global ds_window
- global ds
-
- if verbose:
- print('launch drag source')
-
- if display == None:
- Gdk.set_allowed_backends('wayland')
- display = Gdk.Display.open('gtk-test')
-
- ds_window = Gtk.Window.new()
- ds_window.set_title('Drag Source')
- ds_window.set_display(display)
-
- ds = Gtk.DragSource.new()
- ds.set_content(Gdk.ContentProvider.new_for_value(value))
- ds_window.add_controller(ds)
- ds.connect('drag-begin', drag_begin)
-
- controller = Gtk.GestureClick.new()
- controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
- controller.connect('pressed', pressed_cb)
- controller.connect('released', released_cb)
- ds_window.add_controller(controller)
-
- ds_window.connect('notify::is-active', got_active)
- ds_window.maximize()
- ds_window.present()
-
- wait(500)
-
- assert ds_window.is_active(), "drag source not active"
- assert ds_window.get_width() == 1024, "Window not maximized"
- assert ds_window.get_height() == 768, "Window not maximized"
-
- # we need to wait out the map animation, or pointer coords will be off
- wait(1000)
-
-def stop_drag_source():
- global ds_window
- ds_window.destroy()
- ds_window = None
-
-dt_window = None
-
-def do_drop(controller, value, x, y):
- global expected_change
- global loop
-
- if verbose:
- print(f'got drop {value}')
- assert expected_change != None, "Unexpected drop begin"
- assert expected_change['type'] == 'drop', "Drop expected"
- assert expected_change['value'] == value, "Unexpected value dropped"
-
- expected_change = None
- loop.quit()
-
-def launch_drop_target():
- global dt_window
-
- if verbose:
- print('launch drop target')
-
- dt_window = Gtk.Window.new()
- dt_window.set_title('Drop Target')
- dt_window.set_display(display)
-
- controller = Gtk.DropTarget.new(GObject.TYPE_STRING, Gdk.DragAction.COPY)
- dt_window.add_controller(controller)
- controller.connect('drop', do_drop)
-
- dt_window.connect('notify::is-active', got_active)
- dt_window.maximize()
- dt_window.present()
-
- wait(500)
-
- assert dt_window.is_active(), "drop target not active"
- assert dt_window.get_width() == 1024, "Window not maximized"
- assert dt_window.get_height() == 768, "Window not maximized"
-
- # we need to wait out the map animation, or pointer coords will be off
- wait(1000)
-
-def stop_drop_target():
- global dt_window
- dt_window.destroy()
- dt_window = None
-
-def expect_drag(timeout):
- global expected_change
- expected_change = {
- 'type' : 'drag',
- }
- wait(timeout)
- assert expected_change == None, "DND operation not started"
-
-def expect_drop(value, timeout):
- global expected_change
- expected_change = {
- 'type' : 'drop',
- 'value' : value
- }
- wait(timeout)
- assert expected_change == None, "Drop has not happened"
-
-def dnd_tests():
- try:
- launch_drag_source('abc')
-
- pointer_move(100, 100)
- button_press(1)
- expect_button_press(button=1, x=100, y=100, timeout=300)
-
- pointer_move(120, 150)
- expect_drag(timeout=1000)
-
- launch_drop_target()
- button_release(1)
- expect_drop('abc', timeout=200)
-
- stop_drop_target()
- stop_drag_source()
- except AssertionError as e:
- print("Error in dnd_tests: {0}".format(e))
- terminate()
-
-def session_closed_cb():
- print('Session closed')
-
-def mutter_appeared(name):
- global remote_desktop
- global session
- global stream_path
- global done
-
- if verbose:
- print("mutter appeared on the bus")
-
- remote_desktop = bus.get('org.gnome.Mutter.RemoteDesktop',
- '/org/gnome/Mutter/RemoteDesktop')
- device_types = remote_desktop.Get('org.gnome.Mutter.RemoteDesktop', 'SupportedDeviceTypes')
- assert device_types & 1 == 1, "No keyboard"
- assert device_types & 2 == 2, "No pointer"
-
- screen_cast = bus.get('org.gnome.Mutter.ScreenCast',
- '/org/gnome/Mutter/ScreenCast')
-
- session_path = remote_desktop.CreateSession()
- session = bus.get('org.gnome.Mutter.RemoteDesktop', session_path)
- session.onClosed = session_closed_cb
-
- screen_cast_session_path = screen_cast.CreateSession({ 'remote-desktop-session-id' : GLib.Variant('s', session.SessionId)})
- screen_cast_session = bus.get('org.gnome.Mutter.ScreenCast', screen_cast_session_path)
-
- stream_path = screen_cast_session.RecordMonitor('Meta-0', {})
- session.Start()
-
- basic_keyboard_tests()
- basic_pointer_tests()
- dnd_tests()
-
- session.Stop()
-
- done = True
-
-def mutter_vanished():
- global done
- if remote_desktop != None:
- if verbose:
- print("mutter left the bus")
- done = True
-
-bus = SessionBus()
-bus.watch_name('org.gnome.Mutter.RemoteDesktop', 0, mutter_appeared, mutter_vanished)
-
-try:
- while not done:
- GLib.MainContext.default().iteration(True)
-except KeyboardInterrupt:
- print('Interrupted')
+++ /dev/null
-import sys
-import subprocess
-import gi
-
-gi.require_version('Gdk', '4.0')
-
-from gi.repository import GLib, Gdk
-from pydbus import SessionBus
-
-verbose = True
-
-screen_cast = None
-monitors = {}
-waiting = False
-done = False
-monitor_model = None
-
-def terminate():
- for key in monitors:
- monitor = monitors[key];
- pipeline = monitor['pipeline'];
- pipeline.terminate()
- sys.exit(1)
-
-def stream_added_closure(name):
- def stream_added(node_id):
- monitor = monitors[name];
-
- freq = monitor['freq'];
- width = monitor['width'];
- height = monitor['height'];
- # FIXME scale = monitor['scale'];
-
- # Use gstreamer out-of-process, since the gst gl support gets
- # itself into a twist with its wayland connection when monitors
- # disappear
- pipeline_desc = f'gst-launch-1.0 pipewiresrc path={node_id} ! video/x-raw,max-framerate={freq}/1,width={width},height={height} ! videoconvert ! glimagesink'
- if verbose:
- print(f'launching {pipeline_desc}')
- monitor['pipeline'] = subprocess.Popen([pipeline_desc], shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
-
- return stream_added
-
-def add_monitor(name, width, height, scale, freq):
- if verbose:
- print(f'add monitor {name}: {width}x{height}, scale {scale}, frequency {freq}')
- session_path = screen_cast.CreateSession({})
- session = bus.get('org.gnome.Mutter.ScreenCast', session_path)
- monitors[name] = {
- "session": session,
- "width": width,
- "height": height,
- "scale": scale,
- "freq": freq
- }
- stream_path = session.RecordVirtual({})
- stream = bus.get('org.gnome.Mutter.ScreenCast', stream_path)
- stream.onPipeWireStreamAdded = stream_added_closure(name)
- session.Start()
-
-def remove_monitor(name):
- if verbose:
- print(f'remove monitor {name}')
- try:
- monitor = monitors[name];
- pipeline = monitor['pipeline']
- pipeline.kill()
- session = monitor['session']
- session.Stop()
- except KeyError:
- print("failed to remove monitor")
- monitors[name] = None
-
-expected_change = None
-loop = None
-
-def quit_cb(loop):
- loop.quit()
-
-def wait(millis):
- global loop
- loop = GLib.MainLoop()
- GLib.timeout_add(millis, quit_cb, loop)
- loop.run()
-
-def monitors_changed(monitors, position, removed, added):
- global expected_change
-
- assert expected_change != None, "No change expected"
- assert position == expected_change['position'], "Unexpected position in monitors-changed"
- assert removed == expected_change['removed'], "Unexpected removed in monitors-changed"
- assert added == expected_change['added'], "Unexpected added in monitors-changed"
-
- if verbose:
- print('got expected change')
-
- expected_change = None
- loop.quit()
-
-def launch_observer():
- global monitor_model
-
- if verbose:
- print('launch observer')
-
- Gdk.set_allowed_backends('wayland')
- display = Gdk.Display.open('gtk-test')
-
- monitor_model = display.get_monitors()
- assert monitor_model.get_n_items() == 0, "Unexpected initial monitors"
- monitor_model.connect('items-changed', monitors_changed)
-
-def expect_monitors_changed(position, removed, added, timeout):
- global expected_change
- expected_change = {
- 'position' : position,
- 'removed' : removed,
- 'added' : added
- }
- wait(timeout)
- assert expected_change == None, "Expected change did not happen"
-
-def got_connector(monitor, pspec):
- loop.quit()
-
-def expect_monitor(position, width, height, scale, freq):
- assert monitor_model.get_n_items() > position, f'Monitor {position} not present'
- monitor = monitor_model.get_item(position)
- if monitor.get_connector() == None:
- handler = monitor.connect('notify::connector', got_connector)
- wait(500)
- monitor.disconnect(handler)
- assert monitor.is_valid(), "Monitor is not valid"
- geometry = monitor.get_geometry()
- assert geometry.width == width, "Unexpected monitor width"
- assert geometry.height == height, "Unexpected monitor height"
- assert monitor.get_scale_factor() == scale, "Unexpected scale factor"
- assert monitor.get_refresh_rate() == freq, "Unexpected monitor frequency"
- if verbose:
- print(f'monitor {position}: {geometry.width}x{geometry.height} frequency {monitor.get_refresh_rate()} scale {monitor.get_scale_factor()} model \'{monitor.get_model()}\' connector \'{monitor.get_connector()}\'')
-
-def run_commands():
- try:
- launch_observer()
-
- add_monitor("0", width=100, height=100, scale=1, freq=60)
- expect_monitors_changed(0, 0, 1, 1000)
- expect_monitor (position=0, width=100, height=100, scale=1, freq=60000)
-
- add_monitor("1", width=1024, height=768, scale=1, freq=144)
- expect_monitors_changed(1, 0, 1, 1000)
- expect_monitor (position=1, width=1024, height=768, scale=1, freq=144000)
-
- remove_monitor("0")
- expect_monitors_changed(0, 1, 0, 11000) # mutter takes 10 seconds to remove it
-
- remove_monitor("1")
- expect_monitors_changed(0, 1, 0, 11000)
- except AssertionError as e:
- print("Error: {0}".format(e))
- terminate()
-
-def mutter_appeared(name):
- global screen_cast
- global done
- if verbose:
- print("mutter appeared on the bus")
- screen_cast = bus.get('org.gnome.Mutter.ScreenCast',
- '/org/gnome/Mutter/ScreenCast')
- run_commands()
- done = True
-
-def mutter_vanished():
- global done
- if screen_cast != None:
- if verbose:
- print("mutter left the bus")
- done = True
-
-bus = SessionBus()
-bus.watch_name('org.gnome.Mutter.ScreenCast', 0, mutter_appeared, mutter_vanished)
-
-try:
- while not done:
- GLib.MainContext.default().iteration(True)
-except KeyboardInterrupt:
- print('Interrupted')
+++ /dev/null
-#! /bin/sh
-
-builddir=$(pwd)/build
-
-dbus-run-session sh <<EOF
-
-#echo DBUS_SESSION_BUS_ADDRESS=\$DBUS_SESSION_BUS_ADDRESS
-#echo WAYLAND_DISPLAY=gtk-test
-
-mutter --headless --virtual-monitor 1024x768 --no-x11 --wayland-display gtk-test >&mutter.log &
-pid=\$!
-
-export WAYLAND_DISPLAY=gtk-test
-export GDK_BACKEND=wayland
-export GTK_A11Y=none
-#export WAYLAND_DEBUG=1
-
-export GI_TYPELIB_PATH=$builddir/gtk:/usr/lib64/girepository-1.0
-export LD_PRELOAD=$builddir/gtk/libgtk-4.so
-
-python tests/headless-input-tests.py
-status=\$?
-
-kill \$pid
-
-exit \$status
-
-EOF
+++ /dev/null
-#! /bin/sh
-
-builddir=$(pwd)/build
-
-dbus-run-session sh <<EOF
-
-# echo DBUS_SESSION_BUS_ADDRESS=\$DBUS_SESSION_BUS_ADDRESS
-# echo WAYLAND_DISPLAY=gtk-test
-
-mutter --headless --no-x11 --wayland-display gtk-test >&mutter.log &
-pid=\$!
-
-export WAYLAND_DISPLAY=gtk-test
-export GDK_BACKEND=wayland
-
-export GI_TYPELIB_PATH=$builddir/gtk:/usr/lib64/girepository-1.0
-export LD_PRELOAD=$builddir/gtk/libgtk-4.so
-
-python tests/headless-monitor-tests.py
-status=\$?
-
-kill \$pid
-
-exit \$status
-
-EOF
--- /dev/null
+import sys
+import os
+import subprocess
+import gi
+
+gi.require_version('Gdk', '4.0')
+gi.require_version('Gtk', '4.0')
+
+from gi.repository import GLib, GObject, Gdk, Gtk
+from pydbus import SessionBus
+
+verbose = True
+
+remote_desktop = None
+screen_cast = None
+session = None
+stream_path = None
+done = False
+
+def terminate():
+ sys.exit(1)
+
+loop = None
+
+def quit_cb(loop):
+ loop.quit()
+
+def wait(millis):
+ global loop
+ loop = GLib.MainLoop()
+ GLib.timeout_add(millis, quit_cb, loop)
+ loop.run()
+
+display = None
+window = None
+expected_change = None
+
+def key_pressed_cb (controller, keyval, keycode, state):
+ global expected_change
+ global loop
+
+ if verbose:
+ print(f'got key press: {keyval}, state {state}')
+ assert expected_change != None, "Unexpected key press"
+ assert expected_change['type'] == 'press', "Key press event expected"
+ assert keyval == expected_change['keyval'], "Unexpected keyval in key press event"
+ assert state == expected_change['state'], "Unexpected state in key press event"
+
+ expected_change = None
+ loop.quit()
+
+def key_released_cb (controller, keyval, keycode, state):
+ global expected_change
+ global loop
+
+ if verbose:
+ print(f'got key release: {keyval}, state {state}')
+ assert expected_change != None, "Unexpected key release"
+ assert expected_change['type'] == 'release', "Key release event expected"
+ assert keyval == expected_change['keyval'], "Unexpected keyval in key release event"
+ assert state == expected_change['state'], "Unexpected state in key release event"
+
+ expected_change = None
+ loop.quit()
+
+def motion_cb (controller, x, y):
+ global expected_change
+ global loop
+
+ if verbose:
+ print(f'got motion: {x}, {y}')
+ if expected_change != None:
+ assert expected_change['type'] == 'motion', "Motion event expected"
+ assert x == expected_change['x'], "Unexpected x coord in motion event"
+ assert y == expected_change['y'], "Unexpected y coord in motion event"
+ expected_change = None
+ loop.quit()
+
+def enter_cb (controller, x, y):
+ global expected_change
+ global loop
+
+ if verbose:
+ print(f'got enter: {x}, {y}')
+ assert expected_change != None, "Unexpected enter"
+ assert expected_change['type'] == 'enter', "Enter event expected"
+ assert x == expected_change['x'], "Unexpected x coord in enter event"
+ assert y == expected_change['y'], "Unexpected y coord in enter event"
+
+ expected_change = None
+ loop.quit()
+
+def pressed_cb(controller, n, x, y):
+ global expected_change
+ global loop
+
+ if verbose:
+ print(f'got pressed')
+ assert expected_change != None, "Unexpected event"
+ assert expected_change['type'] == 'press', "Button press expected"
+ assert expected_change['button'] == controller.get_current_button(), "Unexpected button pressed"
+ assert x == expected_change['x'], "Unexpected x coord in motion event"
+ assert y == expected_change['y'], "Unexpected y coord in motion event"
+
+ expected_change = None
+ loop.quit()
+
+def released_cb(controller, n, x, y):
+ global expected_change
+ global loop
+
+ if verbose:
+ print(f'got released')
+ assert expected_change != None, "Unexpected event"
+ assert expected_change['type'] == 'release', "Button release expected"
+
+ expected_change = None
+ loop.quit()
+
+def expect_key_press(keyval, state, timeout):
+ global expected_change
+ expected_change = {
+ 'type' : 'press',
+ 'keyval' : keyval,
+ 'state' : state
+ }
+ wait(timeout)
+ assert expected_change == None, "Expected event did not happen"
+
+def expect_key_release(keyval, state, timeout):
+ global expected_change
+ expected_change = {
+ 'type' : 'release',
+ 'keyval' : keyval,
+ 'state' : state
+ }
+ wait(timeout)
+ assert expected_change == None, "Expected event did not happen"
+
+def expect_motion(x, y, timeout):
+ global expected_change
+ expected_change = {
+ 'type' : 'motion',
+ 'x' : x,
+ 'y' : y
+ }
+ wait(timeout)
+ assert expected_change == None, "Expected event did not happen"
+
+def expect_enter(x, y, timeout):
+ global expected_change
+ expected_change = {
+ 'type' : 'enter',
+ 'x' : x,
+ 'y' : y
+ }
+ wait(timeout)
+ assert expected_change == None, "Expected event did not happen"
+
+def expect_button_press(button, x, y, timeout):
+ global expected_change
+ expected_change = {
+ 'type' : 'press',
+ 'button' : button,
+ 'x' : x,
+ 'y' : y
+ }
+ wait(timeout)
+ assert expected_change == None, "Button press did not arrive"
+
+def expect_button_release(button, x, y, timeout):
+ global expected_change
+ expected_change = {
+ 'type' : 'release',
+ 'button' : button,
+ 'x' : x,
+ 'y' : y
+ }
+ wait(timeout)
+ assert expected_change == None, "Button release did not arrive"
+
+def got_active(object, pspec):
+ global loop
+ object.disconnect_by_func(got_active)
+ loop.quit()
+
+def launch_observer():
+ global display
+ global window
+
+ if verbose:
+ print('launch observer')
+
+ if display == None:
+ display = Gdk.Display.open(os.getenv('WAYLAND_DISPLAY'))
+
+ window = Gtk.Window.new()
+
+ controller = Gtk.EventControllerKey.new()
+ controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
+ controller.connect('key-pressed', key_pressed_cb)
+ controller.connect('key-released', key_released_cb)
+ window.add_controller(controller)
+
+ controller = Gtk.EventControllerMotion.new()
+ controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
+ controller.connect('enter', enter_cb)
+ controller.connect('motion', motion_cb)
+ window.add_controller(controller)
+
+ controller = Gtk.GestureClick.new()
+ controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
+ controller.connect('pressed', pressed_cb)
+ controller.connect('released', released_cb)
+ window.add_controller(controller)
+
+ window.connect('notify::is-active', got_active)
+ window.maximize()
+ window.present()
+
+ wait(500)
+
+ assert window.is_active(), "Observer not active"
+ assert window.get_width() == 1024, "Window not maximized"
+ assert window.get_height() == 768, "Window not maximized"
+
+ # we need to wait out the map animation, or pointer coords will be off
+ wait(1000)
+
+def stop_observer():
+ global window
+ window.destroy()
+ window = None
+
+def key_press(keyval):
+ if verbose:
+ print(f'press key {keyval}')
+ session.NotifyKeyboardKeysym(keyval, True)
+
+def key_release(keyval):
+ if verbose:
+ print(f'release key {keyval}')
+ session.NotifyKeyboardKeysym(keyval, False)
+
+buttons = {
+ 1 : 0x110,
+ 2 : 0x111,
+ 3 : 0x112
+}
+
+def button_press(button):
+ if verbose:
+ print(f'press button {button}')
+ session.NotifyPointerButton(buttons[button], True)
+
+def button_release(button):
+ if verbose:
+ print(f'relase button {button}')
+ session.NotifyPointerButton(buttons[button], False)
+
+def pointer_move(x, y):
+ if verbose:
+ print(f'pointer move {x} {y}')
+ session.NotifyPointerMotionAbsolute(stream_path, x, y)
+
+def basic_keyboard_tests():
+ try:
+ launch_observer()
+
+ key_press(Gdk.KEY_a)
+ expect_key_press(keyval=Gdk.KEY_a, state=0, timeout=100)
+
+ key_release(Gdk.KEY_a)
+ expect_key_release(keyval=Gdk.KEY_a, state=0, timeout=100)
+
+ key_press(Gdk.KEY_Control_L)
+ expect_key_press(keyval=Gdk.KEY_Control_L, state=0, timeout=100)
+
+ key_press(Gdk.KEY_x)
+ expect_key_press(keyval=Gdk.KEY_x, state=Gdk.ModifierType.CONTROL_MASK, timeout=100)
+
+ key_release(Gdk.KEY_Control_L)
+ expect_key_release(keyval=Gdk.KEY_Control_L, state=Gdk.ModifierType.CONTROL_MASK, timeout=100)
+
+ key_release(Gdk.KEY_x)
+ expect_key_release(keyval=Gdk.KEY_x, state=0, timeout=100)
+
+ stop_observer()
+ except AssertionError as e:
+ print("Error in basic_keyboard_tests: {0}".format(e))
+ terminate()
+
+def basic_pointer_tests():
+ try:
+ pointer_move(-100.0, -100.0)
+ launch_observer()
+
+ # observer window is maximized, so window coords == global coords
+ pointer_move(500.0, 300.0)
+ expect_enter(x=500, y=300, timeout=200)
+
+ pointer_move(400.0, 200.0)
+ expect_motion(x=400, y=200, timeout=200)
+
+ button_press(1)
+ expect_button_press(button=1, x=400, y=200, timeout=200)
+
+ pointer_move(220.0, 200.0)
+ expect_motion(x=220, y=200, timeout=200)
+
+ button_release(1)
+ expect_button_release(button=1, x=220, y=200, timeout=200)
+
+ stop_observer()
+ except AssertionError as e:
+ print("Error in basic_pointer_tests: {0}".format(e))
+ terminate()
+
+ds_window = None
+ds = None
+
+def drag_begin(controller, drag):
+ global expected_change
+ global loop
+
+ if verbose:
+ print(f'got drag begin')
+ assert expected_change != None, "Unexpected drag begin"
+ assert expected_change['type'] == 'drag', "Drag begin expected"
+
+ expected_change = None
+ loop.quit()
+
+def launch_drag_source(value):
+ global display
+ global ds_window
+ global ds
+
+ if verbose:
+ print('launch drag source')
+
+ if display == None:
+ display = Gdk.Display.open(os.getenv('WAYLAND_DISPLAY'))
+
+ ds_window = Gtk.Window.new()
+ ds_window.set_title('Drag Source')
+
+ ds = Gtk.DragSource.new()
+ ds.set_content(Gdk.ContentProvider.new_for_value(value))
+ ds_window.add_controller(ds)
+ ds.connect('drag-begin', drag_begin)
+
+ controller = Gtk.GestureClick.new()
+ controller.set_propagation_phase(Gtk.PropagationPhase.CAPTURE)
+ controller.connect('pressed', pressed_cb)
+ controller.connect('released', released_cb)
+ ds_window.add_controller(controller)
+
+ ds_window.connect('notify::is-active', got_active)
+ ds_window.maximize()
+ ds_window.present()
+
+ wait(500)
+
+ assert ds_window.is_active(), "drag source not active"
+ assert ds_window.get_width() == 1024, "Window not maximized"
+ assert ds_window.get_height() == 768, "Window not maximized"
+
+ # we need to wait out the map animation, or pointer coords will be off
+ wait(1000)
+
+def stop_drag_source():
+ global ds_window
+ ds_window.destroy()
+ ds_window = None
+
+dt_window = None
+
+def do_drop(controller, value, x, y):
+ global expected_change
+ global loop
+
+ if verbose:
+ print(f'got drop {value}')
+ assert expected_change != None, "Unexpected drop begin"
+ assert expected_change['type'] == 'drop', "Drop expected"
+ assert expected_change['value'] == value, "Unexpected value dropped"
+
+ expected_change = None
+ loop.quit()
+
+def launch_drop_target():
+ global display
+ global dt_window
+
+ if verbose:
+ print('launch drop target')
+
+ if display == None:
+ display = Gdk.Display.open(os.getenv('WAYLAND_DISPLAY'))
+
+ dt_window = Gtk.Window.new()
+ dt_window.set_title('Drop Target')
+
+ controller = Gtk.DropTarget.new(GObject.TYPE_STRING, Gdk.DragAction.COPY)
+ dt_window.add_controller(controller)
+ controller.connect('drop', do_drop)
+
+ dt_window.connect('notify::is-active', got_active)
+ dt_window.maximize()
+ dt_window.present()
+
+ wait(500)
+
+ assert dt_window.is_active(), "drop target not active"
+ assert dt_window.get_width() == 1024, "Window not maximized"
+ assert dt_window.get_height() == 768, "Window not maximized"
+
+ # we need to wait out the map animation, or pointer coords will be off
+ wait(1000)
+
+def stop_drop_target():
+ global dt_window
+ dt_window.destroy()
+ dt_window = None
+
+def expect_drag(timeout):
+ global expected_change
+ expected_change = {
+ 'type' : 'drag',
+ }
+ wait(timeout)
+ assert expected_change == None, "DND operation not started"
+
+def expect_drop(value, timeout):
+ global expected_change
+ expected_change = {
+ 'type' : 'drop',
+ 'value' : value
+ }
+ wait(timeout)
+ assert expected_change == None, "Drop has not happened"
+
+def dnd_tests():
+ try:
+ launch_drag_source('abc')
+
+ pointer_move(100, 100)
+ button_press(1)
+ expect_button_press(button=1, x=100, y=100, timeout=300)
+ # need to wait out the MIN_TIME_TO_DND
+ wait(150)
+
+ pointer_move(120, 150)
+ expect_drag(timeout=1000)
+
+ launch_drop_target()
+ button_release(1)
+ expect_drop('abc', timeout=200)
+
+ stop_drop_target()
+ stop_drag_source()
+ except AssertionError as e:
+ print("Error in dnd_tests: {0}".format(e))
+ terminate()
+
+def session_closed_cb():
+ print('Session closed')
+
+def mutter_appeared(name):
+ global remote_desktop
+ global session
+ global stream_path
+ global done
+
+ if verbose:
+ print("mutter appeared on the bus")
+
+ remote_desktop = bus.get('org.gnome.Mutter.RemoteDesktop',
+ '/org/gnome/Mutter/RemoteDesktop')
+ device_types = remote_desktop.Get('org.gnome.Mutter.RemoteDesktop', 'SupportedDeviceTypes')
+ assert device_types & 1 == 1, "No keyboard"
+ assert device_types & 2 == 2, "No pointer"
+
+ screen_cast = bus.get('org.gnome.Mutter.ScreenCast',
+ '/org/gnome/Mutter/ScreenCast')
+
+ session_path = remote_desktop.CreateSession()
+ session = bus.get('org.gnome.Mutter.RemoteDesktop', session_path)
+ session.onClosed = session_closed_cb
+
+ screen_cast_session_path = screen_cast.CreateSession({ 'remote-desktop-session-id' : GLib.Variant('s', session.SessionId)})
+ screen_cast_session = bus.get('org.gnome.Mutter.ScreenCast', screen_cast_session_path)
+
+ stream_path = screen_cast_session.RecordMonitor('Meta-0', {})
+ session.Start()
+
+ # work around lack of initial devices
+ key_press(Gdk.KEY_Control_L)
+ key_release(Gdk.KEY_Control_L)
+ pointer_move(-100, -100)
+
+ basic_keyboard_tests()
+ basic_pointer_tests()
+ dnd_tests()
+
+ session.Stop()
+
+ done = True
+
+def mutter_vanished():
+ global done
+ if remote_desktop != None:
+ if verbose:
+ print("mutter left the bus")
+ done = True
+
+bus = SessionBus()
+bus.watch_name('org.gnome.Mutter.RemoteDesktop', 0, mutter_appeared, mutter_vanished)
+
+try:
+ while not done:
+ GLib.MainContext.default().iteration(True)
+except KeyboardInterrupt:
+ print('Interrupted')
--- /dev/null
+import sys
+import os
+import subprocess
+import gi
+
+gi.require_version('Gdk', '4.0')
+
+from gi.repository import GLib, Gdk
+from pydbus import SessionBus
+
+verbose = True
+
+screen_cast = None
+monitors = {}
+waiting = False
+done = False
+monitor_model = None
+display = None
+
+def terminate():
+ for key in monitors:
+ monitor = monitors[key];
+ pipeline = monitor['pipeline'];
+ pipeline.terminate()
+ sys.exit(1)
+
+def stream_added_closure(name):
+ def stream_added(node_id):
+ monitor = monitors[name];
+
+ freq = monitor['freq'];
+ width = monitor['width'];
+ height = monitor['height'];
+ # FIXME scale = monitor['scale'];
+
+ # Use gstreamer out-of-process, since the gst gl support gets
+ # itself into a twist with its wayland connection when monitors
+ # disappear
+ pipeline_desc = f'gst-launch-1.0 pipewiresrc path={node_id} ! video/x-raw,max-framerate={freq}/1,width={width},height={height} ! videoconvert ! glimagesink'
+ if verbose:
+ print(f'launching {pipeline_desc}')
+ monitor['pipeline'] = subprocess.Popen([pipeline_desc], shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
+
+ return stream_added
+
+def add_monitor(name, width, height, scale, freq):
+ if verbose:
+ print(f'add monitor {name}: {width}x{height}, scale {scale}, frequency {freq}')
+ session_path = screen_cast.CreateSession({})
+ session = bus.get('org.gnome.Mutter.ScreenCast', session_path)
+ monitors[name] = {
+ "session": session,
+ "width": width,
+ "height": height,
+ "scale": scale,
+ "freq": freq
+ }
+ stream_path = session.RecordVirtual({})
+ stream = bus.get('org.gnome.Mutter.ScreenCast', stream_path)
+ stream.onPipeWireStreamAdded = stream_added_closure(name)
+ session.Start()
+
+def remove_monitor(name):
+ if verbose:
+ print(f'remove monitor {name}')
+ try:
+ monitor = monitors[name];
+ pipeline = monitor['pipeline']
+ pipeline.kill()
+ session = monitor['session']
+ session.Stop()
+ except KeyError:
+ print("failed to remove monitor")
+ monitors[name] = None
+
+expected_change = None
+loop = None
+
+def quit_cb(loop):
+ loop.quit()
+
+def wait(millis):
+ global loop
+ loop = GLib.MainLoop()
+ GLib.timeout_add(millis, quit_cb, loop)
+ loop.run()
+
+def monitors_changed(monitors, position, removed, added):
+ global expected_change
+
+ assert expected_change != None, "No change expected"
+ assert position == expected_change['position'], "Unexpected position in monitors-changed"
+ assert removed == expected_change['removed'], "Unexpected removed in monitors-changed"
+ assert added == expected_change['added'], "Unexpected added in monitors-changed"
+
+ if verbose:
+ print('got expected change')
+
+ expected_change = None
+ loop.quit()
+
+def launch_observer():
+ global monitor_model
+ global display
+
+ if display == None:
+ display = Gdk.Display.open(os.getenv('WAYLAND_DISPLAY'))
+
+ if verbose:
+ print('launch observer')
+
+ monitor_model = display.get_monitors()
+ assert monitor_model.get_n_items() == 0, "Unexpected initial monitors"
+ monitor_model.connect('items-changed', monitors_changed)
+
+def expect_monitors_changed(position, removed, added, timeout):
+ global expected_change
+ expected_change = {
+ 'position' : position,
+ 'removed' : removed,
+ 'added' : added
+ }
+ wait(timeout)
+ assert expected_change == None, "Expected change did not happen"
+
+def got_connector(monitor, pspec):
+ loop.quit()
+
+def expect_monitor(position, width, height, scale, freq):
+ assert monitor_model.get_n_items() > position, f'Monitor {position} not present'
+ monitor = monitor_model.get_item(position)
+ if monitor.get_connector() == None:
+ handler = monitor.connect('notify::connector', got_connector)
+ wait(500)
+ monitor.disconnect(handler)
+ assert monitor.is_valid(), "Monitor is not valid"
+ geometry = monitor.get_geometry()
+ assert geometry.width == width, "Unexpected monitor width"
+ assert geometry.height == height, "Unexpected monitor height"
+ assert monitor.get_scale_factor() == scale, "Unexpected scale factor"
+ assert monitor.get_refresh_rate() == freq, "Unexpected monitor frequency"
+ if verbose:
+ print(f'monitor {position}: {geometry.width}x{geometry.height} frequency {monitor.get_refresh_rate()} scale {monitor.get_scale_factor()} model \'{monitor.get_model()}\' connector \'{monitor.get_connector()}\'')
+
+def run_commands():
+ try:
+ launch_observer()
+
+ add_monitor("0", width=100, height=100, scale=1, freq=60)
+ expect_monitors_changed(0, 0, 1, 1000)
+ expect_monitor (position=0, width=100, height=100, scale=1, freq=60000)
+
+ add_monitor("1", width=1024, height=768, scale=1, freq=144)
+ expect_monitors_changed(1, 0, 1, 1000)
+ expect_monitor (position=1, width=1024, height=768, scale=1, freq=144000)
+
+ remove_monitor("0")
+ expect_monitors_changed(0, 1, 0, 11000) # mutter takes 10 seconds to remove it
+
+ remove_monitor("1")
+ expect_monitors_changed(0, 1, 0, 11000)
+ except AssertionError as e:
+ print("Error: {0}".format(e))
+ terminate()
+
+def mutter_appeared(name):
+ global screen_cast
+ global done
+ if verbose:
+ print("mutter appeared on the bus")
+ screen_cast = bus.get('org.gnome.Mutter.ScreenCast',
+ '/org/gnome/Mutter/ScreenCast')
+ run_commands()
+ done = True
+
+def mutter_vanished():
+ global done
+ if screen_cast != None:
+ if verbose:
+ print("mutter left the bus")
+ done = True
+
+bus = SessionBus()
+bus.watch_name('org.gnome.Mutter.ScreenCast', 0, mutter_appeared, mutter_vanished)
+
+try:
+ while not done:
+ GLib.MainContext.default().iteration(True)
+except KeyboardInterrupt:
+ print('Interrupted')
--- /dev/null
+env = environment()
+env.prepend('GI_TYPELIB_PATH',
+ project_build_root / 'gtk',
+ gi_dep.get_variable(pkgconfig: 'typelibdir'),
+)
+env.prepend('LD_PRELOAD', project_build_root / 'gtk' / 'libgtk-4.so')
+env.prepend('MESON_CURRENT_SOURCE_DIR', meson.current_source_dir())
+
+test('monitor',
+ find_program('run-headless-monitor-tests.sh', dirs: meson.current_source_dir()),
+ suite: ['headless'],
+ env: env,
+)
+
+test('input',
+ find_program('run-headless-input-tests.sh', dirs: meson.current_source_dir()),
+ suite: ['headless'],
+ env: env,
+)
--- /dev/null
+#! /bin/sh
+
+srcdir=${MESON_CURRENT_SOURCE_DIR:-./testsuite/headless}
+
+dbus-run-session sh <<EOF
+
+#echo DBUS_SESSION_BUS_ADDRESS=\$DBUS_SESSION_BUS_ADDRESS
+#echo WAYLAND_DISPLAY=gtk-test
+
+export GTK_A11Y=none
+export GIO_USE_VFS=local
+
+mutter --headless --virtual-monitor 1024x768 --no-x11 --wayland-display gtk-test2 >&mutter2.log &
+pid=\$!
+
+export WAYLAND_DISPLAY=gtk-test2
+export GDK_BACKEND=wayland
+
+python ${srcdir}/headless-input-tests.py
+status=\$?
+
+kill \$pid
+
+exit \$status
+
+EOF
--- /dev/null
+#! /bin/sh
+
+srcdir=${MESON_CURRENT_SOURCE_DIR:-./testsuite/headless}
+
+dbus-run-session sh <<EOF
+
+# echo DBUS_SESSION_BUS_ADDRESS=\$DBUS_SESSION_BUS_ADDRESS
+# echo WAYLAND_DISPLAY=gtk-test
+
+export GTK_A11Y=none
+export GIO_USE_VFS=local
+
+mutter --headless --no-x11 --wayland-display gtk-test >&mutter.log &
+pid=\$!
+
+export WAYLAND_DISPLAY=gtk-test
+export GDK_BACKEND=wayland
+
+python ${srcdir}/headless-monitor-tests.py
+status=\$?
+
+kill \$pid
+
+exit \$status
+
+EOF
subdir('a11y')
subdir('tools')
subdir('reftests')
-
if build_gir
subdir('introspection')
endif
+if wayland_enabled
+ subdir('headless')
+endif